package profiterole.mapreduce;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import profiterole.api.OnUpdateStatusCallback;
import profiterole.api.Waffle;
import profiterole.mapreduce.MapCallback.OutputUnit;

public class MapReduceService<TMapInput> {

	public static <TMapInput> Waffle<?> mapReduce(
			List<TMapInput> input) {

		// apply with empty listener never null
		return mapReduce(input, new OnUpdateStatusCallback() {

			@Override
			public void onMediumTwo(String info) {
			}

			@Override
			public void onMediumOne(String info) {
			}

			@Override
			public void onInit(String info) {
			}

			@Override
			public void onFinish(String info) {
			}
		});
	}

	/**
	 * general direction map will be by jobs the mapping will be allocating jobs
	 * to reducers ==> and applying reducer ==> all multi threaded then linearly
	 * return results
	 */
	public static <TMapInput> Waffle<?> mapReduce(
			List<TMapInput> input, OnUpdateStatusCallback listener) {

		// library code checks parameters for validity
		if(input == null || listener == null) {
			return Reducer.reduce(new LinkedList<HashMap<String, Integer>>());
		}
		
		MapCallback<TMapInput> mapper = new MapCallback<TMapInput>();
		List<HashMap<String, Integer>> maps = new LinkedList<HashMap<String, Integer>>();
		int numThreads = 25;
		ExecutorService pool = Executors.newFixedThreadPool(numThreads);
		CompletionService<OutputUnit> futurePool = new ExecutorCompletionService<MapCallback.OutputUnit>(
				pool);
		Set<Future<OutputUnit>> futureSet = new HashSet<Future<OutputUnit>>();

		listener.onInit("Starting tasks pool");
		// linear addition of jobs, parallel execution
		for (TMapInput m : input) {
			futureSet.add(futurePool.submit(mapper.makeWorker(m)));
		}

		// tasks running
		pool.shutdown();
		int n = futureSet.size();

		for (int i = 0; i < n; i++) {
			listener.onMediumOne("Processing task: " + i + " out of: " + n);
			try {
				maps.add(futurePool.take().get().getMap());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

		Waffle<?> result = Reducer.reduce(maps);
		listener.onFinish("Almost done size of set: " + result.getSortedUnModifiableList().size());
		
		return result;
	}
}